Consuming Events
Advanced patterns for event handling, filtering, and error recovery
This guide covers advanced patterns for consuming events in production applications. For basic event handling, see Event Handling.
Observability Events (Disabled by Default)
By default, observability events are NOT emitted, so you don't need to filter them in most applications.
Observability events (IObservabilityEvent) are internal diagnostic events for logging, metrics, and debugging. They include events like MiddlewareProgressEvent, IterationStartEvent, CheckpointEvent, etc.
Enabling Observability Events (Optional)
If you need internal diagnostics for debugging or monitoring:
var agent = new AgentBuilder()
.WithConfig(new AgentConfig
{
Observability = new ObservabilityConfig
{
EmitObservabilityEvents = true // Enable for debugging
}
})
.Build();
// When enabled, filter them in your event loop:
await foreach (var evt in agent.RunAsync(messages))
{
if (evt is IObservabilityEvent) continue; // Filter out
// ... handle user-facing events
}For most applications: Leave EmitObservabilityEvents = false (default) and skip the filter entirely.
Core Patterns
Pattern 1: Direct Iteration (Most Common)
The fundamental pattern for consuming events:
await foreach (var evt in agent.RunAsync(messages))
{
switch (evt)
{
case TextDeltaEvent delta:
Console.Write(delta.Text);
break;
case ToolCallStartEvent toolStart:
Console.WriteLine($"\n[Calling: {toolStart.ToolName}]");
break;
case MessageTurnFinishedEvent:
Console.WriteLine("\n✓ Done");
break;
case PermissionRequestEvent permission:
var approved = await PromptUserAsync(permission);
await agent.SendResponseAsync(permission.PermissionId,
new PermissionResponseEvent
{
PermissionId = permission.PermissionId,
Approved = approved
});
break;
}
}Pattern 2: Message Accumulation
Build complete messages from streaming deltas:
var currentMessage = new StringBuilder();
await foreach (var evt in agent.RunAsync(messages))
{
switch (evt)
{
case TextDeltaEvent delta:
currentMessage.Append(delta.Text);
Console.Write(delta.Text);
break;
case TextMessageEndEvent:
// Message complete - save to database
var fullMessage = currentMessage.ToString();
await SaveMessageAsync(fullMessage);
currentMessage.Clear();
break;
case MessageTurnFinishedEvent:
// Add to conversation history
messages.Add(new ChatMessage
{
Role = "assistant",
Content = currentMessage.ToString()
});
break;
}
}Pattern 3: Event Handler Registration
For complex UIs, use event handlers:
public class AgentEventRouter
{
private readonly Dictionary<Type, List<Func<AgentEvent, Task>>> _handlers = new();
public void On<TEvent>(Func<TEvent, Task> handler) where TEvent : AgentEvent
{
var type = typeof(TEvent);
if (!_handlers.ContainsKey(type))
_handlers[type] = new();
_handlers[type].Add(evt => handler((TEvent)evt));
}
public async Task RouteAsync(AgentEvent evt)
{
if (_handlers.TryGetValue(evt.GetType(), out var handlers))
{
foreach (var handler in handlers)
await handler(evt);
}
}
}
// Usage
var router = new AgentEventRouter();
router.On<TextDeltaEvent>(async delta =>
{
await UpdateUIAsync(delta.Text);
});
router.On<ToolCallStartEvent>(async toolStart =>
{
await ShowToolIndicatorAsync(toolStart.ToolName);
});
router.On<MessageTurnFinishedEvent>(async _ =>
{
await SetLoadingStateAsync(false);
});
// Route all events
await foreach (var evt in agent.RunAsync(messages))
{
await router.RouteAsync(evt);
}Filtering Patterns
Filter Observability Events
** ALWAYS do this first** to prevent console spam:
await foreach (var evt in agent.RunAsync(messages))
{
// FIRST LINE: Filter out internal diagnostics
// Now handle user-facing events
switch (evt) { ... }
}Filter by Event Category
Use marker interfaces to filter categories:
await foreach (var evt in agent.RunAsync(messages))
{
// Only handle bidirectional events
if (evt is IBidirectionalEvent bidirectional)
{
await HandleBidirectionalAsync(bidirectional);
continue;
}
// Only handle permission events
if (evt is IPermissionEvent permission)
{
await HandlePermissionAsync(permission);
continue;
}
}Filter by Execution Context
Filter events from nested agents (see SubAgent Events):
await foreach (var evt in agent.RunAsync(messages))
{
// Only process root agent events
if (evt.ExecutionContext?.IsSubAgent == true) continue;
// Or: Only process events from specific agent
if (evt.ExecutionContext?.AgentName != "MainAgent") continue;
HandleEvent(evt);
}Filter by Priority
Filter events by their routing priority:
await foreach (var evt in agent.RunAsync(messages))
{
// Only handle high-priority events
if (evt.Priority is EventPriority.Immediate or EventPriority.Control)
{
await HandleCriticalEventAsync(evt);
}
}Error Handling Patterns
Pattern 1: Handle Turn-Level Errors
Always handle MessageTurnErrorEvent to show errors to users:
await foreach (var evt in agent.RunAsync(messages))
{
switch (evt)
{
case MessageTurnErrorEvent error:
Console.WriteLine($"\n✗ Error: {error.ErrorMessage}");
// Show error to user, disable retry button, etc.
await ShowErrorUIAsync(error.ErrorMessage);
break;
case MessageTurnFinishedEvent:
Console.WriteLine("\n✓ Success");
await SetLoadingStateAsync(false);
break;
}
}Pattern 2: Try/Catch for Stream Errors
Handle exceptions from the stream itself:
try
{
await foreach (var evt in agent.RunAsync(messages))
{
await HandleEventAsync(evt);
}
}
catch (OperationCanceledException)
{
// User cancelled (Ctrl+C or stop button)
Console.WriteLine("\n⚠ Cancelled by user");
}
catch (Exception ex)
{
// Unexpected error (network failure, etc.)
Console.WriteLine($"\n✗ Unexpected error: {ex.Message}");
await LogErrorAsync(ex);
}Pattern 3: Timeout Protection
Add timeout to prevent hanging forever:
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5));
try
{
await foreach (var evt in agent.RunAsync(messages, cancellationToken: cts.Token))
{
await HandleEventAsync(evt);
}
}
catch (OperationCanceledException) when (cts.Token.IsCancellationRequested)
{
Console.WriteLine("\n⚠ Operation timed out after 5 minutes");
}Cancellation Patterns
User-Initiated Cancellation
Allow users to stop long-running operations:
using var cts = new CancellationTokenSource();
// Hook up stop button
stopButton.Click += (s, e) => cts.Cancel();
try
{
await foreach (var evt in agent.RunAsync(messages, cancellationToken: cts.Token))
{
await HandleEventAsync(evt);
}
}
catch (OperationCanceledException)
{
Console.WriteLine("\n⚠ Stopped by user");
}For detailed cancellation patterns, see Streaming & Cancellation.
Performance Patterns
Pattern 1: Buffered Updates
Reduce UI thrashing by batching text deltas:
var buffer = new StringBuilder();
var lastFlush = DateTime.UtcNow;
await foreach (var evt in agent.RunAsync(messages))
{
switch (evt)
{
case TextDeltaEvent delta:
buffer.Append(delta.Text);
// Flush every 50ms or when buffer is full
if ((DateTime.UtcNow - lastFlush).TotalMilliseconds > 50 || buffer.Length > 100)
{
await UpdateUIAsync(buffer.ToString());
buffer.Clear();
lastFlush = DateTime.UtcNow;
}
break;
case MessageTurnFinishedEvent:
// Final flush
if (buffer.Length > 0)
{
await UpdateUIAsync(buffer.ToString());
buffer.Clear();
}
await SetLoadingStateAsync(false);
break;
}
}Pattern 2: Async Event Handlers
Prevent blocking the event stream:
await foreach (var evt in agent.RunAsync(messages))
{
// Don't await slow operations inline
switch (evt)
{
case ToolCallResultEvent result:
// Fire and forget for logging
_ = LogToolResultAsync(result);
break;
case TextDeltaEvent delta:
// Await UI updates (important for correctness)
await UpdateUIAsync(delta.Text);
break;
}
}Common Mistakes
Not Filtering Observability Events
// WRONG: Will log 20+ internal diagnostics per turn
await foreach (var evt in agent.RunAsync(messages))
{
Console.WriteLine(evt.GetType().Name);
}// CORRECT: Filter first
await foreach (var evt in agent.RunAsync(messages))
{
Console.WriteLine(evt.GetType().Name);
}Blocking the Event Stream
// WRONG: Blocks stream until user responds
await foreach (var evt in agent.RunAsync(messages))
{
if (evt is TextDeltaEvent delta)
{
Thread.Sleep(1000); // DON'T DO THIS!
Console.Write(delta.Text);
}
}// CORRECT: Never block the stream
await foreach (var evt in agent.RunAsync(messages))
{
if (evt is TextDeltaEvent delta)
{
await UpdateUIAsync(delta.Text); // Async is fine
}
}Not Handling MessageTurnFinishedEvent
// WRONG: Loading spinner never stops
await foreach (var evt in agent.RunAsync(messages))
{
if (evt is TextDeltaEvent delta)
Console.Write(delta.Text);
// Missing: MessageTurnFinishedEvent handler!
}// CORRECT: Always handle completion
await foreach (var evt in agent.RunAsync(messages))
{
switch (evt)
{
case TextDeltaEvent delta:
Console.Write(delta.Text);
break;
case MessageTurnFinishedEvent:
Console.WriteLine("\n✓ Done");
break;
}
}See Also
- Events Overview - Event lifecycle and categories
- Event Types Reference - Complete event listing
- Bidirectional Events - Request/response patterns
- Streaming & Cancellation - Advanced streaming topics
- SubAgent Events - Nested agent filtering